package cn.doitedu.jobs;

import cn.doitedu.pojo.LogBean;
import cn.doitedu.pojo.MatchResult;
import cn.doitedu.udfs.JsonToBeanFunction;
import cn.doitedu.udfs.RulesMatchFunction;
import cn.doitedu.udfs.RulesMatchFunctionV2;
import cn.doitedu.utils.FlinkUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/**
 * 对Demo1的改造
 * 将规则数据以参数的形式传入，而不是将规则写死在程序中
 *
 * 目前规则引擎V1.0存在的问题
 *
 * 	1.规则没有通过广播状态实时添加，实时修改、实时停用规则
 * 	2.所有的历史行为数据都是查询Clickhouse，对其压力非常大，甚至影响其正常的功能使用（ClickHouse的并发能力较差，会替换StarRocks）
 *  3.没有使用状态和缓存
 *  4.目前规则也不太丰富，不能灵活配置规则的逻辑关系（并且、或者等）
 *
 *
 *
 */
public class Demo2 {

    public static void main(String[] args) throws Exception {

        DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args[0], SimpleStringSchema.class);

        //数据转换清洗
        SingleOutputStreamOperator<LogBean> beanStream = kafkaStream.process(new JsonToBeanFunction());

        //匹配规则
        KeyedStream<LogBean, String> keyedStream = beanStream.keyBy(LogBean::getDeviceId);
        SingleOutputStreamOperator<MatchResult> res = keyedStream.process(new RulesMatchFunctionV2());

        res.print();

        FlinkUtils.env.execute();


    }

}
